#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

library(testthat)

context("MLlib clustering algorithms")

# Tests for MLlib clustering algorithms in SparkR
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

absoluteSparkPath <- function(x) {
  sparkHome <- sparkR.conf("spark.home")
  file.path(sparkHome, x)
}

test_that("spark.bisectingKmeans", {
  newIris <- iris
  newIris$Species <- NULL
  training <- suppressWarnings(createDataFrame(newIris))

  take(training, 1)

  model <- spark.bisectingKmeans(data = training, ~ .)
  sample <- take(select(predict(model, training), "prediction"), 1)
  expect_equal(typeof(sample$prediction), "integer")
  expect_equal(sample$prediction, 1)

  # Test fitted works on Bisecting KMeans
  fitted.model <- fitted(model)
  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
               c(0, 1, 2, 3))

  # Test summary works on KMeans
  summary.model <- summary(model)
  cluster <- summary.model$cluster
  k <- summary.model$k
  expect_equal(k, 4)
  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
               c(0, 1, 2, 3))

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    summary2 <- summary(model2)
    expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
    expect_equal(summary.model$coefficients, summary2$coefficients)
    expect_true(!summary.model$is.loaded)
    expect_true(summary2$is.loaded)

    unlink(modelPath)
  }
})

test_that("spark.gaussianMixture", {
  # R code to reproduce the result.
  # nolint start
  #' library(mvtnorm)
  #' set.seed(1)
  #' a <- rmvnorm(7, c(0, 0))
  #' b <- rmvnorm(8, c(10, 10))
  #' data <- rbind(a, b)
  #' model <- mvnormalmixEM(data, k = 2)
  #' model$lambda
  #
  #  [1] 0.4666667 0.5333333
  #
  #' model$mu
  #
  #  [1] 0.11731091 -0.06192351
  #  [1] 10.363673  9.897081
  #
  #' model$sigma
  #
  #  [[1]]
  #             [,1]       [,2]
  #  [1,] 0.62049934 0.06880802
  #  [2,] 0.06880802 1.27431874
  #
  #  [[2]]
  #            [,1]     [,2]
  #  [1,] 0.2961543 0.160783
  #  [2,] 0.1607830 1.008878
  #
  #' model$loglik
  #
  #  [1] -46.89499
  # nolint end
  data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808),
               list(0.3295078, -0.8204684), list(0.4874291, 0.7383247),
               list(0.5757814, -0.3053884), list(1.5117812, 0.3898432),
               list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664),
               list(9.9838097, 10.9438362), list(10.8212212, 10.5939013),
               list(10.9189774, 10.7821363), list(10.0745650, 8.0106483),
               list(10.6198257, 9.9438713), list(9.8442045, 8.5292476),
               list(9.5218499, 10.4179416))
  df <- createDataFrame(data, c("x1", "x2"))
  model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2)
  stats <- summary(model)
  rLambda <- c(0.4666667, 0.5333333)
  rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081)
  rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874,
              0.2961543, 0.160783, 0.1607830, 1.008878)
  rLoglik <- -46.89499
  expect_equal(stats$lambda, rLambda, tolerance = 1e-3)
  expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3)
  expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3)
  expect_equal(unlist(stats$loglik), rLoglik, tolerance = 1e-3)
  p <- collect(select(predict(model, df), "prediction"))
  expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    stats2 <- summary(model2)
    expect_equal(stats$lambda, stats2$lambda)
    expect_equal(unlist(stats$mu), unlist(stats2$mu))
    expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
    expect_equal(unlist(stats$loglik), unlist(stats2$loglik))

    unlink(modelPath)
  }
})

test_that("spark.kmeans", {
  newIris <- iris
  newIris$Species <- NULL
  training <- suppressWarnings(createDataFrame(newIris))

  take(training, 1)

  model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random")
  sample <- take(select(predict(model, training), "prediction"), 1)
  expect_equal(typeof(sample$prediction), "integer")
  expect_equal(sample$prediction, 0)

  # Test stats::kmeans is working
  statsModel <- kmeans(x = newIris, centers = 2)
  expect_equal(sort(unique(statsModel$cluster)), c(1, 2))

  # Test fitted works on KMeans
  fitted.model <- fitted(model)
  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))

  # Test summary works on KMeans
  summary.model <- summary(model)
  cluster <- summary.model$cluster
  k <- summary.model$k
  expect_equal(k, 2)
  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))

  # test summary coefficients return matrix type
  expect_true(any(class(summary.model$coefficients) == "matrix"))
  expect_true(class(summary.model$coefficients[1, ]) == "numeric")

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    summary2 <- summary(model2)
    expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
    expect_equal(summary.model$coefficients, summary2$coefficients)
    expect_true(!summary.model$is.loaded)
    expect_true(summary2$is.loaded)

    unlink(modelPath)
  }

  # Test Kmeans on dataset that is sensitive to seed value
  col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
  col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
  col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
  cols <- as.data.frame(cbind(col1, col2, col3))
  df <- createDataFrame(cols)

  model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
                         initMode = "random", seed = 1, tol = 1E-5)
  model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
                         initMode = "random", seed = 22222, tol = 1E-5)

  summary.model1 <- summary(model1)
  summary.model2 <- summary(model2)
  cluster1 <- summary.model1$cluster
  cluster2 <- summary.model2$cluster
  clusterSize1 <- summary.model1$clusterSize
  clusterSize2 <- summary.model2$clusterSize

  # The predicted clusters are different
  expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
             c(0, 1, 2, 3))
  expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
             c(0, 1, 2))
  expect_equal(clusterSize1, 4)
  expect_equal(clusterSize2, 3)
})

test_that("spark.lda with libsvm", {
  text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm")
  model <- spark.lda(text, optimizer = "em")

  stats <- summary(model, 10)
  isDistributed <- stats$isDistributed
  logLikelihood <- stats$logLikelihood
  logPerplexity <- stats$logPerplexity
  vocabSize <- stats$vocabSize
  topics <- stats$topicTopTerms
  weights <- stats$topicTopTermsWeights
  vocabulary <- stats$vocabulary
  trainingLogLikelihood <- stats$trainingLogLikelihood
  logPrior <- stats$logPrior

  expect_true(isDistributed)
  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
  expect_equal(vocabSize, 11)
  expect_true(is.null(vocabulary))
  expect_true(trainingLogLikelihood <= 0 & !is.na(trainingLogLikelihood))
  expect_true(logPrior <= 0 & !is.na(logPrior))

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    stats2 <- summary(model2)

    expect_true(stats2$isDistributed)
    expect_equal(logLikelihood, stats2$logLikelihood)
    expect_equal(logPerplexity, stats2$logPerplexity)
    expect_equal(vocabSize, stats2$vocabSize)
    expect_equal(vocabulary, stats2$vocabulary)
    expect_equal(trainingLogLikelihood, stats2$trainingLogLikelihood)
    expect_equal(logPrior, stats2$logPrior)

    unlink(modelPath)
  }
})

test_that("spark.lda with text input", {
  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
  model <- spark.lda(text, optimizer = "online", features = "value")

  stats <- summary(model)
  isDistributed <- stats$isDistributed
  logLikelihood <- stats$logLikelihood
  logPerplexity <- stats$logPerplexity
  vocabSize <- stats$vocabSize
  topics <- stats$topicTopTerms
  weights <- stats$topicTopTermsWeights
  vocabulary <- stats$vocabulary
  trainingLogLikelihood <- stats$trainingLogLikelihood
  logPrior <- stats$logPrior

  expect_false(isDistributed)
  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
  expect_equal(vocabSize, 10)
  expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
  expect_true(is.na(trainingLogLikelihood))
  expect_true(is.na(logPrior))

  # Test model save/load
  modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
  write.ml(model, modelPath)
  expect_error(write.ml(model, modelPath))
  write.ml(model, modelPath, overwrite = TRUE)
  model2 <- read.ml(modelPath)
  stats2 <- summary(model2)

  expect_false(stats2$isDistributed)
  expect_equal(logLikelihood, stats2$logLikelihood)
  expect_equal(logPerplexity, stats2$logPerplexity)
  expect_equal(vocabSize, stats2$vocabSize)
  expect_true(all.equal(vocabulary, stats2$vocabulary))
  expect_true(is.na(stats2$trainingLogLikelihood))
  expect_true(is.na(stats2$logPrior))

  unlink(modelPath)
})

test_that("spark.posterior and spark.perplexity", {
  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
  model <- spark.lda(text, features = "value", k = 3)

  # Assert perplexities are equal
  stats <- summary(model)
  logPerplexity <- spark.perplexity(model, text)
  expect_equal(logPerplexity, stats$logPerplexity)

  # Assert the sum of every topic distribution is equal to 1
  posterior <- spark.posterior(model, text)
  local.posterior <- collect(posterior)$topicDistribution
  expect_equal(length(local.posterior), sum(unlist(local.posterior)))
})

test_that("spark.assignClusters", {
  df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                             list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                             list(4L, 0L, 0.1)),
                        schema = c("src", "dst", "weight"))
  clusters <- spark.assignClusters(df, initMode = "degree", weightCol = "weight")
  expected_result <- createDataFrame(list(list(4L, 1L), list(0L, 0L),
                                          list(1L, 0L), list(3L, 1L),
                                          list(2L, 0L)),
                                     schema = c("id", "cluster"))
  expect_equivalent(expected_result, clusters)
})

sparkR.session.stop()
